feat: Support exact size config for BatchCoalescer #8112
feat: Support exact size config for BatchCoalescer #8112zhuqi-lucas wants to merge 5 commits intoapache:mainfrom
Conversation
|
FYI @alamb , we can trigger benchmark for this PR, i setting the benchmark to use exact size false which is consistent with original datafusion implementation. |
|
And now we reserve target_batch_size, but for non exact size logic, i am thinking how we can improve it. It may some batch size is huge compare target_batch_size. It may affect performance. |
alamb
left a comment
There was a problem hiding this comment.
Nice @zhuqi-lucas -- I left a suggestion - let me know what you think
| // If we have reached the target batch size, finalize the buffered batch | ||
| if self.buffered_rows >= self.target_batch_size { | ||
| self.finish_buffered_batch()?; | ||
| // If we've reached or exceeded target, emit the whole buffered set |
There was a problem hiding this comment.
If we go over the target size, I think it means the underlying storage will reallocate (and thus copy the data)
I think the more performant way to do this is if adding num_rows to the output would go over target_rows, emit early (even though some of the allocated space is not yet used)
There was a problem hiding this comment.
Hi @alamb , thank you for review and good suggestion!
I am trying with the patch to do this based current PR, but the performance not getting better, the best performance from benchmark is still the exact size batch emit. 🤔
diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index be2bbcafb6..93bdf86a2d 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -100,16 +100,23 @@ use primitive::InProgressPrimitiveArray;
/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
///
-/// // Non-strict: produce batch once buffered >= target, batch may be larger than target
+/// // Non-strict: optimized for memory efficiency, may emit early to avoid reallocation
/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4).with_exact_size(false);
/// coalescer.push_batch(batch1).unwrap();
/// // still < 4 rows buffered
/// assert!(coalescer.next_completed_batch().is_none());
/// coalescer.push_batch(batch2).unwrap();
-/// // now buffered >= 4, non-strict mode emits whole buffered set (5 rows)
+/// // buffered=3, new=2, total would be 5 > 4, so emit buffered 3 rows first
/// let finished = coalescer.next_completed_batch().unwrap();
-/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4, 5])).unwrap();
+/// let expected = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
/// assert_eq!(finished, expected);
+///
+/// // The remaining 2 rows from batch2 are now buffered
+/// assert!(coalescer.next_completed_batch().is_none());
+/// coalescer.finish_buffered_batch().unwrap();
+/// let remaining = coalescer.next_completed_batch().unwrap();
+/// let expected = record_batch!(("a", Int32, [4, 5])).unwrap();
+/// assert_eq!(remaining, expected);
/// ```
///
/// # Background
@@ -145,16 +152,21 @@ use primitive::InProgressPrimitiveArray;
///
/// 1. Output rows are produced in the same order as the input rows
///
-/// 2. The output is a sequence of batches, with all but the last being at exactly
-/// `target_batch_size` rows.
+/// 2. The output batch sizes depend on the `exact_size` setting:
+/// - In strict mode: all but the last batch have exactly `target_batch_size` rows
+/// - In non-strict mode: batch sizes are optimized to avoid memory reallocation
///
/// Notes on `exact_size`:
///
/// - `exact_size == true` (strict): output batches are produced so that all but
/// the final batch have exactly `target_batch_size` rows (default behavior).
-/// - `exact_size == false` (non-strict, default for this crate): output batches
-/// will be produced when the buffered rows are >= `target_batch_size`. The
-/// produced batch may be larger than `target_batch_size` (i.e., size >= target).
+/// - `exact_size == false` (non-strict): output batches are optimized for memory
+/// efficiency. Batches are emitted early to avoid buffer reallocation when adding
+/// new data would exceed the target size. Large input batches are split into
+/// target-sized chunks to prevent excessive memory allocation. This may result in
+/// output batches that are smaller than `target_batch_size`, but the algorithm
+/// ensures batches are as close to the target size as possible while maintaining
+/// memory efficiency. Small batches only occur to avoid costly memory operations.
#[derive(Debug)]
pub struct BatchCoalescer {
/// The input schema
@@ -320,7 +332,29 @@ impl BatchCoalescer {
self.finish_buffered_batch()?;
}
} else {
- // Non-strict: append all remaining rows; if buffered >= target, emit them
+ // Non-strict: emit early if adding num_rows would exceed target to avoid reallocation
+ if self.buffered_rows > 0 && self.buffered_rows + num_rows > self.target_batch_size {
+ // Emit the current buffered data before processing the new batch
+ // This avoids potential reallocation in the underlying storage
+ self.finish_buffered_batch()?;
+ }
+
+ // If num_rows is larger than target_batch_size, split it into target-sized chunks
+ // to avoid allocating overly large buffers
+ while num_rows > self.target_batch_size {
+ let chunk_size = self.target_batch_size;
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, chunk_size)?;
+ }
+ self.buffered_rows += chunk_size;
+ offset += chunk_size;
+ num_rows -= chunk_size;
+
+ // Emit this full chunk immediately
+ self.finish_buffered_batch()?;
+ }
+
+ // Now append remaining rows (guaranteed to be <= target_batch_size) to buffer
if num_rows > 0 {
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows(offset, num_rows)?;
@@ -328,7 +362,7 @@ impl BatchCoalescer {
self.buffered_rows += num_rows;
}
- // If we've reached or exceeded target, emit the whole buffered set
+ // If the current buffer has reached or exceeded target, emit it
if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}
@@ -1381,38 +1415,62 @@ mod tests {
coalescer.push_batch(batch1).unwrap();
assert!(coalescer.next_completed_batch().is_none());
- // push second batch (2 rows) -> buffered becomes 5 >= 4, non-strict emits all 5 rows
+ // push second batch (2 rows) -> buffered=3, new=2, 3+2=5 > 4
+ // NEW BEHAVIOR: emit buffered 3 rows first to avoid reallocation
coalescer.push_batch(batch2).unwrap();
- let out = coalescer
+ let out1 = coalescer
.next_completed_batch()
- .expect("expected a completed batch");
- assert_eq!(out.num_rows(), 5);
-
- // check contents equal to concatenation of 0..5
- let expected = uint32_batch(0..5);
- let actual = normalize_batch(out);
- let expected = normalize_batch(expected);
- assert_eq!(expected, actual);
+ .expect("expected first batch");
+ assert_eq!(out1.num_rows(), 3); // Only the first batch (early emit)
+
+ // The second batch should be buffered now
+ assert!(coalescer.next_completed_batch().is_none());
+
+ // Finish to get the remaining buffered data
+ coalescer.finish_buffered_batch().unwrap();
+ let out2 = coalescer
+ .next_completed_batch()
+ .expect("expected second batch");
+ assert_eq!(out2.num_rows(), 2); // The second batch
+
+ // check contents
+ let expected1 = uint32_batch(0..3);
+ let expected2 = uint32_batch(3..5);
+ assert_eq!(normalize_batch(out1), normalize_batch(expected1));
+ assert_eq!(normalize_batch(out2), normalize_batch(expected2));
}
#[test]
fn test_non_strict_single_large_batch() {
- // one large batch > target: in non-strict mode whole batch should be emitted
+ // one large batch > target: should be split into target-sized chunks
let batch = uint32_batch(0..4096);
let schema = Arc::clone(&batch.schema());
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), 1000).with_exact_size(false);
coalescer.push_batch(batch).unwrap();
- let out = coalescer
- .next_completed_batch()
- .expect("expected a completed batch");
- assert_eq!(out.num_rows(), 4096);
-
- // compare to expected
- let expected = uint32_batch(0..4096);
- let actual = normalize_batch(out);
- let expected = normalize_batch(expected);
- assert_eq!(expected, actual);
+
+ // NEW BEHAVIOR: large batch should be split into chunks of target_batch_size
+ // 4096 / 1000 = 4 full batches + 96 remainder
+ let mut outputs = vec![];
+ while let Some(b) = coalescer.next_completed_batch() {
+ outputs.push(b);
+ }
+
+ assert_eq!(outputs.len(), 4); // 4 full batches emitted immediately
+
+ // Each should be exactly 1000 rows
+ for (i, out) in outputs.iter().enumerate() {
+ assert_eq!(out.num_rows(), 1000);
+ let expected = uint32_batch((i * 1000) as u32..((i + 1) * 1000) as u32);
+ assert_eq!(normalize_batch(out.clone()), normalize_batch(expected));
+ }
+
+ // Remaining 96 rows should be buffered
+ coalescer.finish_buffered_batch().unwrap();
+ let final_batch = coalescer.next_completed_batch().expect("expected final batch");
+ assert_eq!(final_batch.num_rows(), 96);
+ let expected_final = uint32_batch(4000..4096);
+ assert_eq!(normalize_batch(final_batch), normalize_batch(expected_final));
}
#[test]
@@ -1439,71 +1497,104 @@ mod tests {
#[test]
fn test_non_strict_multiple_emits_over_time() {
- // multiple pushes that each eventually push buffered >= target and emit
+ // multiple pushes with early emit behavior
let b1 = uint32_batch(0..3); // 3
- let b2 = uint32_batch(3..5); // 2 -> 3+2=5 emit (first)
- let b3 = uint32_batch(5..8); // 3
- let b4 = uint32_batch(8..10); // 2 -> 3+2=5 emit (second)
+ let b2 = uint32_batch(3..5); // 2 -> 3+2=5 > 4, emit 3 first
+ let b3 = uint32_batch(5..8); // 3 -> 2+3=5 > 4, emit 2 first
+ let b4 = uint32_batch(8..10); // 2 -> 3+2=5 > 4, emit 3 first
let schema = Arc::clone(&b1.schema());
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), 4).with_exact_size(false);
+ // Push first batch (3 rows) -> buffered
coalescer.push_batch(b1).unwrap();
assert!(coalescer.next_completed_batch().is_none());
+ // Push second batch (2 rows) -> 3+2=5 > 4, emit buffered 3 rows first
coalescer.push_batch(b2).unwrap();
let out1 = coalescer
.next_completed_batch()
.expect("expected first batch");
- assert_eq!(out1.num_rows(), 5);
- assert_eq!(normalize_batch(out1), normalize_batch(uint32_batch(0..5)));
+ assert_eq!(out1.num_rows(), 3);
+ assert_eq!(normalize_batch(out1), normalize_batch(uint32_batch(0..3)));
+ // Now 2 rows from b2 are buffered, push b3 (3 rows) -> 2+3=5 > 4, emit 2 rows first
coalescer.push_batch(b3).unwrap();
- assert!(coalescer.next_completed_batch().is_none());
-
- coalescer.push_batch(b4).unwrap();
let out2 = coalescer
.next_completed_batch()
.expect("expected second batch");
- assert_eq!(out2.num_rows(), 5);
- assert_eq!(normalize_batch(out2), normalize_batch(uint32_batch(5..10)));
+ assert_eq!(out2.num_rows(), 2);
+ assert_eq!(normalize_batch(out2), normalize_batch(uint32_batch(3..5)));
+
+ // Now 3 rows from b3 are buffered, push b4 (2 rows) -> 3+2=5 > 4, emit 3 rows first
+ coalescer.push_batch(b4).unwrap();
+ let out3 = coalescer
+ .next_completed_batch()
+ .expect("expected third batch");
+ assert_eq!(out3.num_rows(), 3);
+ assert_eq!(normalize_batch(out3), normalize_batch(uint32_batch(5..8)));
+
+ // Finish to get remaining 2 rows from b4
+ coalescer.finish_buffered_batch().unwrap();
+ let out4 = coalescer
+ .next_completed_batch()
+ .expect("expected fourth batch");
+ assert_eq!(out4.num_rows(), 2);
+ assert_eq!(normalize_batch(out4), normalize_batch(uint32_batch(8..10)));
}
#[test]
fn test_non_strict_large_then_more_outputs() {
- // first push a large batch (should produce one big output), then push more small ones to produce another
+ // first push a large batch (should be split), then push more small ones
let big = uint32_batch(0..5000);
let small1 = uint32_batch(5000..5002); // 2
- let small2 = uint32_batch(5002..5005); // 3 -> 2+3=5 >=4 emit
+ let small2 = uint32_batch(5002..5005); // 3 -> 2+3=5 > 4, emit 2 first
let schema = Arc::clone(&big.schema());
- // Use small target (4) so that small1 + small2 will trigger an emit
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), 4).with_exact_size(false);
- // push big: non-strict mode should emit the whole big batch (5000 rows)
+ // push big: should be split into chunks of 4
+ // 5000 / 4 = 1250 full batches
coalescer.push_batch(big).unwrap();
- let out_big = coalescer
- .next_completed_batch()
- .expect("expected big batch");
- assert_eq!(out_big.num_rows(), 5000);
- assert_eq!(
- normalize_batch(out_big),
- normalize_batch(uint32_batch(0..5000))
- );
- // push small1 (2 rows) -> not enough yet
+ let mut big_outputs = vec![];
+ while let Some(b) = coalescer.next_completed_batch() {
+ big_outputs.push(b);
+ }
+
+ assert_eq!(big_outputs.len(), 1250); // 1250 batches of 4 rows each
+ for (i, out) in big_outputs.iter().enumerate() {
+ assert_eq!(out.num_rows(), 4);
+ let start = i * 4;
+ let end = (i + 1) * 4;
+ let expected = uint32_batch(start as u32..end as u32);
+ assert_eq!(normalize_batch(out.clone()), normalize_batch(expected));
+ }
+
+ // push small1 (2 rows) -> buffered
coalescer.push_batch(small1).unwrap();
assert!(coalescer.next_completed_batch().is_none());
- // push small2 (3 rows) -> now buffered = 2 + 3 = 5 >= 4, non-strict emits all 5 rows
+ // push small2 (3 rows) -> 2+3=5 > 4, emit buffered 2 rows first
coalescer.push_batch(small2).unwrap();
- let out_small = coalescer
+ let out_small1 = coalescer
+ .next_completed_batch()
+ .expect("expected small batch 1");
+ assert_eq!(out_small1.num_rows(), 2);
+ assert_eq!(
+ normalize_batch(out_small1),
+ normalize_batch(uint32_batch(5000..5002))
+ );
+
+ // Finish to get remaining 3 rows from small2
+ coalescer.finish_buffered_batch().unwrap();
+ let out_small2 = coalescer
.next_completed_batch()
- .expect("expected small batch");
- assert_eq!(out_small.num_rows(), 5);
+ .expect("expected small batch 2");
+ assert_eq!(out_small2.num_rows(), 3);
assert_eq!(
- normalize_batch(out_small),
- normalize_batch(uint32_batch(5000..5005))
+ normalize_batch(out_small2),
+ normalize_batch(uint32_batch(5002..5005))
);
}
}
Thank you @alamb , i do some experiment, but it seems the exact size way still the best performance from benchmark until now, i will investigate more. |
|
I think this PR is superceded by Marking as a draft as I am trying to clean up the review queue |
Which issue does this PR close?
Related to :
apache/datafusion#17105
Related to Draft: Use upstream arrow
coalescekernel in DataFusion datafusion#16249Related to Optimize take/filter/concat from multiple input arrays to a single large output array #6692
Related to Enable parquet filter pushdown (
filter_pushdown) by default datafusion#3463Rationale for this change
We want to keep consistent with original behaviour for BatchCoalescer in datafusion, so we introduce new config to support
no exact size config, which means :
What changes are included in this PR?
Are these changes tested?
Yes, new tests added.
Are there any user-facing changes?
No, the default behaviour is still exact size = true